package org.bdware.sc.conn;

import com.google.gson.JsonPrimitive;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.bdware.sc.ContractResult;
import org.bdware.sc.codec.LengthFieldBasedFrameCodec;
import org.bdware.sc.get.GetMessage;
import org.bdware.sc.util.JsonUtil;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

public class SocketGet {
    private static final org.apache.logging.log4j.Logger LOGGER =
            org.apache.logging.log4j.LogManager.getLogger(SocketGet.class);
    static EventLoopGroup group = new NioEventLoopGroup(8);
    private final String ipAddress;
    private final int port;
    AtomicInteger i = new AtomicInteger(0);
    Channel channel;
    Bootstrap b;
    SyncResult r = new SyncResult();
    Random random = new Random();
    OfflineHandler offlineExceptionHandler;

    public SocketGet(String ipAddress, int port) {
        // logger.info("[SocketGet] 构造器 : port=" + port);
        this.ipAddress = ipAddress;
        this.port = port;
        b = new Bootstrap();
        // logger.info("[SocketGet] 构造器 : position---------------1");

        // TODO  可能有问题

        b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);

        // logger.info("[SocketGet] 构造器 : position---------------2");

        b.group(group);

        // logger.info("[SocketGet] 构造器 : position---------------3");
        b.channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(
                        new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) {
                                ChannelPipeline p = ch.pipeline();
                                // new com.yancloud.sc.codec.DelimiterCodec()
                                // new LengthFieldBasedFrameCodec()
                                p.addLast(new LengthFieldBasedFrameCodec()).addLast(
                                        new SimpleChannelInboundHandler<ByteBuf>() {
                                            @Override
                                            protected void channelRead0(
                                                    ChannelHandlerContext arg0,
                                                    ByteBuf arg1)
                                                    throws Exception {
                                                ObjectInputStream in =
                                                        new ObjectInputStream(
                                                                new ByteBufInputStream(
                                                                        arg1));
                                                long id = in.readLong();
                                                // TODO zero copy here
                                                String result = (String) in.readObject();
                                                r.wakeUp(id, result);
                                            }

                                            @Override
                                            public void channelActive(
                                                    ChannelHandlerContext ctx) {
                                            }

                                            @Override
                                            public void channelInactive(
                                                    ChannelHandlerContext ctx) {
                                                SocketGet.this.channel = null;
                                            }

                                            @Override
                                            public void exceptionCaught(
                                                    ChannelHandlerContext ctx,
                                                    Throwable cause) {
                                                cause.printStackTrace();
                                            }
                                        });
                            }
                        });
        try {
            // logger.info("[SocketGet] 构造器 : position---------------4");
            channel = b.connect(ipAddress, port).sync().channel();
            // logger.info("[SocketGet] 构造器 : position---------------5");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void setOfflineExceptionHandler(OfflineHandler offlineExceptionHandler) {
        this.offlineExceptionHandler = offlineExceptionHandler;
    }

    private ObjectOutputStream request(String pkgName, String method, String arg, long id)
            throws InterruptedException, IOException {
        if (!isAlive()) {
            // TODO  可能有问题
            // logger.info("[SocketGet] request position------1");
            channel = b.connect(ipAddress, port).sync().channel();
        }
        // logger.info("[SocketGet] request position------2");
        GetMessage msg = new GetMessage(pkgName, method, arg);
        // logger.info("[SocketGet] request position------3");
        ByteBuf buf = Unpooled.buffer();
        ObjectOutputStream oo = new ObjectOutputStream(new ByteBufOutputStream(buf));
        // logger.info("[SocketGet] request position------4");
        oo.writeLong(id);
        oo.writeObject(msg);
        // logger.info("[SocketGet] request position------5");
        channel.writeAndFlush(buf);
        // logger.info("[SocketGet] request position------6");
        return oo;
    }

    public String syncGet(String pkgName, String method, String arg) {
        try {
            long id = random.nextLong();
            final ContractResult cr =
                    new ContractResult(ContractResult.Status.Error, new JsonPrimitive("Timeout!!"));
            ResultCallback cb =
                    new ResultCallback() {
                        @Override
                        public void onResult(String str) {
                            cr.status = ContractResult.Status.Success;
                            cr.result = null == str ? null : new JsonPrimitive(str);
                            synchronized (this) {
                                this.notifyAll();
                            }
                        }
                    };
            if (!r.waitObj.containsKey(id)) {
                // logger.info("[SocketGet] sleep position------2");
                r.waitObj.put(id, cb);
                // logger.info("[SocketGet] waitObj中放入requestID=" +id + " 的callback");
            }
            request(pkgName, method, arg, id).close();
            return r.syncSleep(id, cr, cb);
        } catch (Exception e) {
            e.printStackTrace();
            final ContractResult cr =
                    new ContractResult(
                            ContractResult.Status.Error,
                            new JsonPrimitive("Failed due to Exception:" + e.getMessage()));
            return JsonUtil.toJson(cr);
        }
    }

    public void asyncGet(String pkgName, String method, String arg, ResultCallback cb) {
        try {
            long id = random.nextLong();
            if (!r.waitObj.containsKey(id) && null != cb) {
                r.waitObj.put(id, cb);
            } else {
                LOGGER.warn("id conflict! " + id);
            }
            r.sleep(id, cb);
            request(pkgName, method, arg, id);
        } catch (Exception e) {
            e.printStackTrace();
            ContractResult cr =
                    new ContractResult(
                            ContractResult.Status.Error, new JsonPrimitive(e.getMessage()));
            if (!isAlive() && offlineExceptionHandler != null) {
                offlineExceptionHandler.onException(this, e);
                cr.result = new JsonPrimitive("unavailable now try again later, " + e.getMessage());
            }
            if (null != cb) {
                cb.onResult(JsonUtil.toJson(cr));
            }
        }
    }

    public boolean isAlive() {
        return channel != null && channel.isActive();
    }

    public abstract static class OfflineHandler {
        public abstract void onException(SocketGet socketGet, Exception e);
    }
}
